e073db33184a7c7b5dc5af117e917ac03f09780b,cdap-notifications/src/test/java/co/cask/cdap/notifications/NotificationTest.java,NotificationTest,onePublisherOneSubscriberTest,#,142

Before Change



    Assert.assertTrue(feedClient.createFeed(FEED1));
    try {
      final NotificationClient.Publisher<String> publisher = getNotificationClient().createPublisher(FEED1);

      try {
        // Create a subscribing process
        NotificationClient.Subscriber subscriber = getNotificationClient().createSubscriber();

        final AtomicInteger receiveCount = new AtomicInteger(0);
        final AtomicBoolean assertionOk = new AtomicBoolean(true);

        subscriber.add(FEED1, new NotificationHandler<String>() {
          @Override
          public Type getNotificationFeedType() {
            return String.class;
          }

          @Override
          public void processNotification(String notification, NotificationContext notificationContext) {
            LOG.debug("Received notification payload: {}", notification);
            try {
              Assert.assertEquals("fake-payload-" + receiveCount.get(), notification);
              receiveCount.incrementAndGet();
            } catch (Throwable t) {
              assertionOk.set(false);
              Throwables.propagate(t);
            }
          }
        });

        Cancellable cancellable = subscriber.consume();

        // Runnable to publish notifications on behalf of the publisher entity
        Runnable publisherRunnable = new Runnable() {

After Change


      final AtomicInteger receiveCount = new AtomicInteger(0);
      final AtomicBoolean assertionOk = new AtomicBoolean(true);

      Cancellable cancellable = notificationService.subscribe(FEED1, new NotificationHandler<String>() {
        @Override
        public Type getNotificationFeedType() {
          return String.class;
        }

        @Override
        public void processNotification(String notification, NotificationContext notificationContext) {
          LOG.debug("Received notification payload: {}", notification);
          try {
            Assert.assertEquals("fake-payload-" + receiveCount.get(), notification);
            receiveCount.incrementAndGet();
          } catch (Throwable t) {
            assertionOk.set(false);
            Throwables.propagate(t);
          }
        }
      });

      // Runnable to publish notifications on behalf of the publisher entity
      Runnable publisherRunnable = new Runnable() {